home *** CD-ROM | disk | FTP | other *** search
- """
- This module provides a command line client for the aptdaemon
- """
- # Copyright (C) 2008-2009 Sebastian Heinlein <sevel@glatzor.de>
- #
- # Licensed under the GNU General Public License Version 2
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program; if not, write to the Free Software
- # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-
- __author__ = "Sebastian Heinlein <devel@glatzor.de>"
-
- import array
- import fcntl
- from gettext import gettext as _
- from optparse import OptionParser
- import os
- import pty
- import termios
- import time
- import tty
- import signal
- import sys
-
- from aptsources.sourceslist import SourceEntry
- import apt_pkg
- import gobject
- import dbus.glib
-
- import aptdaemon
- import client
- import enums
- import policykit1
-
- ANSI_BOLD = chr(27) + "[1m"
- ANSI_RESET = chr(27) + "[0m"
-
-
- class ConsoleClient:
- """
- Command line interface client to aptdaemon
- """
- def __init__(self, show_terminal=True, allow_unauthenticated=False):
- self._client = client.AptClient()
- self.master_fd, self.slave_fd = pty.openpty()
- self._signals = []
- signal.signal(signal.SIGINT, self._on_cancel_signal)
- signal.signal(signal.SIGQUIT, self._on_cancel_signal)
- signal.signal(signal.SIGWINCH, self._on_terminal_resize)
- self._terminal_width = self._get_terminal_width()
- self._watchers = []
- self._old_tty_mode = None
- self._show_status = True
- self._status = ""
- self._percent = 0
- self._show_terminal = show_terminal
- self._allow_unauthenticated = allow_unauthenticated
- self._show_progress = True
- self._status_details = ""
- self._progress_details = ""
- # Used for a spinning line to indicate a still working transaction
- self._spin_elements = "|/-\\"
- self._spin_cur = -1
- self._spin_stamp = time.time()
- self._transaction = None
-
- def install_packages(self, packages):
- """Install packages"""
- trans = self._client.install_packages(packages,
- exit_handler=self._on_exit)
- self._run_transaction(trans)
-
- def add_repository(self, line="", sourcesfile=""):
- """Add repository to the sources list."""
- entry = SourceEntry(line)
- return self._client.add_repository(entry.type, entry.uri, entry.dist,
- entry.comps, entry.comment,
- sourcesfile)
-
- def add_vendor_key_from_file(self, path):
- """Install repository key file."""
- trans = self._client.add_vendor_key_from_file(path,
- exit_handler=self._on_exit)
- self._run_transaction(trans)
-
- def remove_vendor_key(self, fingerprint):
- """Remove repository key."""
- trans = self._client.remove_vendor_key(fingerprint,
- exit_handler=self._on_exit)
- self._run_transaction(trans)
-
- def install_file(self, path):
- """Install package file."""
- trans = self._client.install_file(path, exit_handler=self._on_exit)
- self._run_transaction(trans)
-
- def list_trusted_vendor_keys(self):
- """List the keys of the trusted vendors."""
- keys = self._client.get_trusted_vendor_keys()
- for key in keys:
- print key
-
- def remove_packages(self, packages):
- """Remove packages"""
- trans = self._client.remove_packages(packages,
- exit_handler=self._on_exit)
- self._run_transaction(trans)
-
- def commit_packages(self, install, reinstall, remove, purge, upgrade):
- """Commit changes"""
- trans = self._client.commit_packages(install, reinstall, remove,
- purge, upgrade,
- exit_handler=self._on_exit)
- self._run_transaction(trans)
-
- def update_cache(self):
- """Update cache"""
- trans = self._client.update_cache(exit_handler=self._on_exit)
- self._run_transaction(trans)
-
- def upgrade_system(self):
- """Upgrade system"""
- trans = self._client.upgrade_system(exit_handler=self._on_exit)
- self._run_transaction(trans)
-
- def _set_transaction(self, transaction):
- """Monitor the given transaction"""
- for handler in self._signals:
- gobject.remove_source(handler)
- self._transaction = transaction
- self._signals = []
- self._signals.append(
- transaction.connect("allow-terminal", self._on_allow_terminal))
- self._signals.append(
- transaction.connect("status", self._on_status))
- self._signals.append(
- transaction.connect("status-details", self._on_status_details))
- self._signals.append(
- transaction.connect("progress", self._on_progress))
- self._signals.append(
- transaction.connect("progress-details", self._on_progress_details))
- if self._show_terminal:
- transaction.set_terminal(os.ttyname(self.slave_fd))
- transaction.set_allow_unauthenticated(self._allow_unauthenticated)
-
- def _on_exit(self, trans, enum):
- """Callback for the exit state of the transaction"""
- # Make sure to dettach the terminal
- self._detach()
- if self._show_progress:
- sys.stderr.write("[+] 100% " + ANSI_BOLD +
- "%-*.*s" % (self._terminal_width - 9,
- self._terminal_width - 9,
- enums.get_exit_string_from_enum(enum)) +
- ANSI_RESET + "\n")
-
- if enum == enums.EXIT_FAILED:
- excep = trans.get_error()
- msg = "%s: %s\n%s\n\n%s" % (
- _("ERROR"),
- enums.get_error_string_from_enum(excep.code),
- enums.get_error_description_from_enum(excep.code),
- excep.details)
- print msg
-
- def _on_allow_terminal(self, transaction, allow_terminal):
- """Callback for the AllowTerminal signal of the transaction"""
- if self._show_terminal and allow_terminal == True and \
- not self._watchers:
- self._clear_progress()
- self._show_progress = False
- self._attach()
- elif allow_terminal == False:
- self._show_progress = True
- self._detach()
-
- def _on_status(self, transaction, status):
- """Callback for the Status signal of the transaction"""
- self._status = enums.get_status_string_from_enum(status)
- self._update_progress()
-
- def _on_status_details(self, transaction, text):
- """Callback for the StatusDetails signal of the transaction."""
- self._status_details = text
- self._update_progress()
-
- def _on_progress_details(self, transaction, items_done, items_total,
- bytes_done, bytes_total, speed, eta):
- """Callback for the ProgressDetails signal of the transaction."""
- if bytes_total and speed:
- self._progress_details = (_("Downloaded %sB of %sB at %sB/s") % \
- (apt_pkg.SizeToStr(bytes_done),
- apt_pkg.SizeToStr(bytes_total),
- apt_pkg.SizeToStr(speed)))
- elif bytes_total:
- self._progress_details = (_("Downloaded %sB of %sB") % \
- (apt_pkg.SizeToStr(bytes_done),
- apt_pkg.SizeToStr(bytes_total)))
- else:
- self._progress_details = ""
- self._update_progress()
-
- def _on_progress(self, transaction, percent):
- """Callback for the Progress signal of the transaction"""
- self._percent = percent
- self._update_progress()
-
- def _update_progress(self):
- """Update the progress bar."""
- if not self._show_progress:
- return
- text = ANSI_BOLD + self._status + ANSI_RESET
- if self._status_details:
- text += " " + self._status_details
- if self._progress_details:
- text += " (%s)" % self._progress_details
- text_width = self._terminal_width - 9
- # Spin the progress line (maximum 5 times a second)
- if self._spin_stamp + 0.2 < time.time():
- self._spin_cur = (self._spin_cur + 1) % len(self._spin_elements)
- self._spin_stamp = time.time()
- spinner = self._spin_elements[self._spin_cur]
- # Show progress information if available
- if self._percent > 100:
- percent = "---"
- else:
- percent = self._percent
- sys.stderr.write("[%s] " % spinner +
- "%3.3s%% " % percent +
- "%-*.*s" % (text_width, text_width, text) + "\r")
-
- def _clear_progress(self):
- """Clear progress information on stderr."""
- sys.stderr.write("%-*.*s\r" % (self._terminal_width,
- self._terminal_width,
- " "))
-
- def _on_cancel_signal(self, signum, frame):
- """Callback for a cancel signal."""
- if self._transaction:
- self._transaction.cancel()
-
- def _on_terminal_resize(self, signum, frame):
- """Callback for a changed terminal size."""
- self._terminal_width = self._get_terminal_width()
- self._update_progress()
-
- def _detach(self):
- """Dettach the controlling terminal to aptdaemon."""
- for wid in self._watchers:
- gobject.source_remove(wid)
- if self._old_tty_mode:
- tty.tcsetattr(pty.STDIN_FILENO, tty.TCSAFLUSH,
- self._old_tty_mode)
-
- def _attach(self):
- """Attach the controlling terminal to aptdaemon. Based on pty.spwan()"""
- try:
- self._old_tty_mode = tty.tcgetattr(pty.STDIN_FILENO)
- tty.setraw(pty.STDIN_FILENO)
- except tty.error: # This is the same as termios.error
- self._old_tty_mode = None
- flags = gobject.IO_IN | gobject.IO_ERR | gobject.IO_HUP
- wid = gobject.io_add_watch(pty.STDIN_FILENO, flags,
- self._copy_io, self.master_fd)
- self._watchers.append(wid)
- wid = gobject.io_add_watch(self.master_fd, flags,
- self._copy_io, pty.STDOUT_FILENO)
- self._watchers.append(wid)
-
- def _copy_io(self, source, condition, target):
- """Callback to copy data between terminals."""
- if condition == gobject.IO_IN:
- data = os.read(source, 1024)
- if target:
- os.write(target, data)
- return True
- os.close(source)
- return False
-
- def _get_terminal_width(self):
- """Return the witdh in characters of the current terminal."""
- width = array.array("h", fcntl.ioctl(sys.stderr, termios.TIOCGWINSZ,
- "\0" * 8))[1]
- return width
-
- def _run_transaction(self, trans):
- """Run the given transaction in blocking mode and catch authorization
- errors."""
- self._set_transaction(trans)
- try:
- trans.run(block=True)
- except dbus.exceptions.DBusException, error:
- if error.get_dbus_name() == \
- "org.freedesktop.PolicyKit.Error.NotAuthorized":
- #FIXME: Improve wording
- msg = "%s: %s\n\n%s" % (_("ERROR"),
- _("You are not authorized"),
- error.get_dbus_message())
- print msg
- else:
- raise error
-
-
- def main():
- """Run a command line client for aptdaemon"""
- epilog = _("To operate on more than one package put the package "
- "names in quotation marks:\naptdcon --install \"foo bar\"")
- parser = OptionParser(version=aptdaemon.__version__, epilog=epilog)
- parser.add_option("-c", "--refresh", default="",
- action="store_true", dest="refresh",
- help=_("Refresh the cache"))
- parser.add_option("-i", "--install", default="",
- action="store", type="string", dest="install",
- help=_("Install the given packages"))
- parser.add_option("", "--reinstall", default="",
- action="store", type="string", dest="reinstall",
- help=_("Reinstall the given packages"))
- parser.add_option("-r", "--remove", default="",
- action="store", type="string", dest="remove",
- help=_("Remove the given packages"))
- parser.add_option("-p", "--purge", default="",
- action="store", type="string", dest="purge",
- help=_("Remove the given packages including "
- "configuration files"))
- parser.add_option("-u", "--upgrade", default="",
- action="store", type="string", dest="upgrade",
- help=_("Install the given packages"))
- parser.add_option("", "--upgrade-system", default="",
- action="store_true", dest="dist_upgrade",
- help=_("Upgrade the system"))
- parser.add_option("", "--add-vendor-key", default="",
- action="store", type="string", dest="add_vendor_key",
- help=_("Add the vendor to the trusted ones"))
- parser.add_option("", "--add-repository", default="",
- action="store", type="string", dest="add_repository",
- help=_("Add new repository from the given deb-line"))
- parser.add_option("", "--sources-file", action="store", default="",
- type="string", dest="sources_file",
- help=_("Specify an alternative sources.list.d file to "
- "which repositories should be added."))
- parser.add_option("", "--list-trusted-vendors", default="",
- action="store_true", dest="list_trusted_vendor_keys",
- help=_("List trusted vendor keys"))
- parser.add_option("", "--remove-vendor-key", default="",
- action="store", type="string", dest="remove_vendor_key",
- help=_("Remove the trusted key of the given fingerprint"))
- parser.add_option("", "--hide-terminal",
- action="store_true", dest="hide_terminal",
- help=_("Do not attach to the apt terminal"))
- parser.add_option("", "--allow-unauthenticated",
- action="store_true", dest="allow_unauthenticated",
- default=False,
- help=_("Allow packages from unauthenticated sources"))
- (options, args) = parser.parse_args()
- client = ConsoleClient(show_terminal=not options.hide_terminal,
- allow_unauthenticated=options.allow_unauthenticated)
- if options.dist_upgrade:
- client.upgrade_system()
- elif options.refresh:
- client.update_cache()
- elif options.install or options.reinstall or options.remove or \
- options.purge or options.upgrade:
- client.commit_packages(options.install.split(),
- options.reinstall.split(),
- options.remove.split(),
- options.purge.split(),
- options.upgrade.split())
- elif options.add_repository:
- client.add_repository(options.add_repository, options.sources_file)
- elif options.add_vendor_key:
- #FIXME: Should detect if from stdin or file
- client.add_vendor_key_from_file(options.add_vendor_key)
- elif options.remove_vendor_key:
- client.remove_vendor_key(options.remove_vendor_key)
- elif options.list_trusted_vendor_keys:
- client.list_trusted_vendor_keys()
- else:
- parser.print_help()
- sys.exit(1)
-
- if __name__ == "__main__":
- main()
-